/*
 * Copyright Strimzi authors.
 * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
 */
package io.strimzi.systemtest.operators;

import io.fabric8.kubernetes.api.model.rbac.ClusterRole;
import io.skodjob.annotations.Desc;
import io.skodjob.annotations.Label;
import io.skodjob.annotations.Step;
import io.skodjob.annotations.SuiteDoc;
import io.skodjob.annotations.TestDoc;
import io.skodjob.testframe.resources.KubeResourceManager;
import io.strimzi.systemtest.AbstractST;
import io.strimzi.systemtest.Environment;
import io.strimzi.systemtest.TestConstants;
import io.strimzi.systemtest.annotations.IsolatedTest;
import io.strimzi.systemtest.docs.TestDocsLabels;
import io.strimzi.systemtest.enums.ClusterOperatorRBACType;
import io.strimzi.systemtest.resources.operator.ClusterOperatorConfigurationBuilder;
import io.strimzi.systemtest.resources.operator.SetupClusterOperator;
import io.strimzi.systemtest.storage.TestStorage;
import io.strimzi.systemtest.templates.crd.KafkaNodePoolTemplates;
import io.strimzi.systemtest.templates.crd.KafkaTemplates;
import io.strimzi.systemtest.utils.kafkaUtils.KafkaUtils;
import org.junit.jupiter.api.Tag;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static io.strimzi.systemtest.TestTags.REGRESSION;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assumptions.assumeFalse;

@Tag(REGRESSION)
@SuiteDoc(
    description = @Desc("Test suite for verifying the namespace-scoped RBAC deployment mode for the Cluster Operator, ensuring that `Role` resources are created instead of `ClusterRole` resources when operating in namespace-scoped mode."),
    beforeTestSteps = {
        @Step(value = "Skip this test suite if using OLM or Helm installation.", expected = "This test suite only runs with YAML-based installations.")
    },
    labels = {
        @Label(value = TestDocsLabels.KAFKA)
    }
)
class NamespaceRbacScopeOperatorST extends AbstractST {

    @IsolatedTest("This test case needs own Cluster Operator")
    @TestDoc(
        description = @Desc("This test verifies that when the Cluster Operator is deployed with namespace-scoped RBAC, it creates `Role` resources instead of `ClusterRole` resources, ensuring proper isolation and avoiding cluster-wide permissions."),
        steps = {
            @Step(value = "Deploy the Cluster Operator with namespace-scoped RBAC configuration.", expected = "The Cluster Operator is deployed with RBAC type set to `NAMESPACE`."),
            @Step(value = "Deploy a Kafka cluster with broker and controller node pools.", expected = "The Kafka cluster is deployed and becomes ready."),
            @Step(value = "Verify no `ClusterRole` resources with Strimzi labels exist.", expected = "No `ClusterRole` resources labeled with `app=strimzi` are found in the cluster.")
        },
        labels = {
            @Label(value = TestDocsLabels.KAFKA)
        }
    )
    void testNamespacedRbacScopeDeploysRoles() {
        final TestStorage testStorage = new TestStorage(KubeResourceManager.get().getTestContext());
        assumeFalse(Environment.isOlmInstall() || Environment.isHelmInstall());

        SetupClusterOperator
            .getInstance()
            .withCustomConfiguration(new ClusterOperatorConfigurationBuilder()
                .withClusterOperatorRBACType(ClusterOperatorRBACType.NAMESPACE)
                .withNamespacesToWatch(TestConstants.CO_NAMESPACE + "," + Environment.TEST_SUITE_NAMESPACE)
                .build()
            )
            .install();

        KubeResourceManager.get().createResourceWithWait(
            KafkaNodePoolTemplates.brokerPool(testStorage.getNamespaceName(), testStorage.getBrokerPoolName(), testStorage.getClusterName(), 3).build(),
            KafkaNodePoolTemplates.controllerPool(testStorage.getNamespaceName(), testStorage.getControllerPoolName(), testStorage.getClusterName(), 3).build()
        );
        KubeResourceManager.get().createResourceWithWait(KafkaTemplates.kafka(testStorage.getNamespaceName(), testStorage.getClusterName(), 3)
            .editMetadata()
                .addToLabels("app", "strimzi")
            .endMetadata()
            .build());

        // Wait for Kafka to be Ready to ensure all potentially erroneous ClusterRole applications have happened
        KafkaUtils.waitForKafkaReady(Environment.TEST_SUITE_NAMESPACE, testStorage.getClusterName());

        // Assert that no ClusterRoles are present on the server that have app strimzi
        // Naturally returns false positives if another Strimzi operator has been installed
        List<ClusterRole> strimziClusterRoles = KubeResourceManager.get().kubeClient().getClient().rbac().clusterRoles().list().getItems().stream()
            .filter(cr -> {
                Map<String, String> labels = cr.getMetadata().getLabels() != null ? cr.getMetadata().getLabels() : Collections.emptyMap();
                return "strimzi".equals(labels.get("app"));
            })
            .collect(Collectors.toList());
        assertThat(strimziClusterRoles, is(Collections.emptyList()));
    }
}
